Work on buffer resizable
authorJeroen van der Heijden <jeroen@transceptor.technology>
Wed, 3 Oct 2018 14:57:52 +0000 (16:57 +0200)
committerJeroen van der Heijden <jeroen@transceptor.technology>
Wed, 3 Oct 2018 14:57:52 +0000 (16:57 +0200)
14 files changed:
ChangeLog-2.0.30
include/procinfo/procinfo.h
include/siri/db/buffer.h
include/siri/db/db.h
src/procinfo/procinfo.c
src/siri/backup.c
src/siri/buffersync.c
src/siri/db/buffer.c
src/siri/db/db.c
src/siri/db/insert.c
src/siri/db/props.c
src/siri/db/series.c
src/siri/db/server.c
src/siri/db/servers.c

index 3bfa16bcd96fac588c82fd295ff25dbb926e2180..0f6b6d1160fb27cb46582fa7f02e0776418e739c 100644 (file)
@@ -8,4 +8,6 @@
   
   * Added option to fsync the buffer on a configurable interval.
   
-  * Use posix_fadvise() on the buffer file. (@Svedrin)
\ No newline at end of file
+  * Use posix_fadvise() on the buffer file. (@Svedrin)
+  
+  * Refactor buffer and cleanup alternative buffer path.
\ No newline at end of file
index 1b905ef9129c6f415036cb0c7f871dc289429ded..7ad5df705a7203bec12505cd5cb7233e3b53ce24 100644 (file)
@@ -25,7 +25,7 @@ long int procinfo_total_physical_memory(void);
 long int procinfo_total_virtual_memory(void);
 
 /* Total Open Files */
-long int procinfo_open_files(const char * path);
+long int procinfo_open_files(const char * path, int include_fd);
 
 
 #endif  /* PROCINFO_H_ */
index 343c7bfabca214e9441e481b669e00ae9555b618..8b4ca624e5821529ba0b5d8fd57933fac666eba8 100644 (file)
@@ -12,6 +12,8 @@
 #ifndef SIRIDB_BUFFER_H_
 #define SIRIDB_BUFFER_H_
 
+typedef struct siridb_buffer_s siridb_buffer_t;
+
 #include <siri/db/db.h>
 #include <siri/db/series.h>
 #include <siri/db/points.h>
 
 #define MAX_BUFFER_SZ 10485760
 
+
+siridb_buffer_t * siridb_buffer_new(void);
+void siridb_buffer_free(siridb_buffer_t * buffer);
 int siridb_buffer_new_series(
-        siridb_t * siridb,
+        siridb_buffer_t * buffer,
         siridb_series_t * series);
-int siridb_buffer_open(siridb_t * siridb);
+int siridb_buffer_open(siridb_buffer_t * buffer);
 int siridb_buffer_load(siridb_t * siridb);
-void siridb_buffer_free(siridb_t * siridb);
 int siridb_buffer_write_empty(
-        siridb_t * siridb,
+        siridb_buffer_t * buffer,
         siridb_series_t * series);
 int siridb_buffer_write_last_point(
-        siridb_t * siridb,
+        siridb_buffer_t * buffer,
         siridb_series_t * series);
-int siridb_buffer_fsync(siridb_t * siridb);
 
+struct siridb_buffer_s
+{
+    size_t size;            /* size for one series inside the buffer */
+    size_t nsize;           /* optional new size from database.conf */
+    size_t len;             /* number of points allocated per series */
+    char * template;        /* template for writing an empty buffer */
+    char * path;            /* path where the buffer file is stored */
+    slist_t * empty;        /* list with empty buffer spaces */
+    FILE * fp;              /* buffer file pointer */
+    int fd;                 /* buffer file descriptor */
+};
+
+static inline int siridb_buffer_fsync(siridb_buffer_t * buffer)
+{
+    return (buffer->fp == NULL) ? 0 : fsync(buffer->fd);
+}
 
 #endif  /* SIRIDB_BUFFER_H_ */
index ec7bae6e49da5a2ab1b5fc152f2d83a82bdc89c8..a9469e7efa8faf73ab6877ec59685a764377e7e6 100644 (file)
@@ -42,6 +42,7 @@ typedef struct siridb_s siridb_t;
 #include <siri/db/groups.h>
 #include <siri/db/tasks.h>
 #include <siri/db/time.h>
+#include <siri/db/buffer.h>
 
 int32_t siridb_get_uptime(siridb_t * siridb);
 int8_t siridb_get_idle_percentage(siridb_t * siridb);
@@ -71,19 +72,15 @@ struct siridb_s
     uint32_t list_limit;
     uuid_t uuid;
     iso8601_tz_t tz;
-    size_t buffer_size;
-    size_t buffer_len;
-    char * buffer_clear;
     struct timespec start_time;     /* to calculate up-time.                */
     uint64_t duration_num;          /* number duration in s, ms, us or ns   */
     uint64_t duration_log;          /* log duration in s, ms, us or ns      */
     char * dbname;
     char * dbpath;
-    char * buffer_path;
     double drop_threshold;
     size_t received_points;
     size_t selected_points;
-    slist_t * empty_buffers;
+
     siridb_time_t * time;
     siridb_server_t * server;
     siridb_server_t * replica;
@@ -95,13 +92,13 @@ struct siridb_s
     uv_mutex_t series_mutex;
     uv_mutex_t shards_mutex;
     imap_t * shards;
-    FILE * buffer_fp;
     FILE * dropped_fp;
     qp_fpacker_t * store;
     siridb_fifo_t * fifo;
     siridb_replicate_t * replicate;
     siridb_reindex_t * reindex;
     siridb_groups_t * groups;
+    siridb_buffer_t * buffer;
     siridb_tasks_t tasks;
 };
 
index 4598332c603b563ac7050b5da63c28f25d3596a9..ea725b027035b9af01d65ce2de1ce25e53c1507c 100644 (file)
@@ -109,7 +109,7 @@ long int procinfo_total_physical_memory(void)
 #endif
 
 #ifdef __APPLE__
-long int procinfo_open_files(const char * path)
+long int procinfo_open_files(const char * path, int include_fd)
 {
     pid_t pid = getpid();
     size_t len = strlen(path);
@@ -147,15 +147,25 @@ long int procinfo_open_files(const char * path)
             if (    res == PROC_PIDFDVNODEPATHINFO_SIZE &&
                     strncmp(path, vnode_info.pvip.vip_path, len) == 0)
             {
+                vnode_info
                 count++;
             }
+            else if (
+                    res == PROC_PIDFDVNODEPATHINFO_SIZE &&
+                    include_fd >= 0 &&
+                    include_fd == fd_info[i].proc_fd)
+            {
+                include_fd = -1;
+                count++;
+            };
+
         }
     }
     free(fd_info);
     return count;
 }
 #else
-long int procinfo_open_files(const char * path)
+long int procinfo_open_files(const char * path, int include_fd)
 {
     long int count = 0;
     DIR * dirp;
@@ -174,7 +184,6 @@ long int procinfo_open_files(const char * path)
         if (entry->d_type == DT_REG || entry->d_type == DT_LNK)
         {
             snprintf(buffer, XPATH_MAX, "/proc/self/fd/%s", entry->d_name);
-
             if (realpath(buffer, buf) == NULL)
             {
                 continue;
@@ -184,6 +193,13 @@ long int procinfo_open_files(const char * path)
             {
                 count++;
             }
+            else if (
+                    include_fd >= 0 &&
+                    include_fd == strtol(entry->d_name, NULL, 10))
+            {
+                include_fd = -1;
+                count++;
+            };
         }
     }
     closedir(dirp);
index 628f33e12c4658a031de2b29919d855518a18f54..7a843372845a6f7882fd2048caddc110c3c29e29 100644 (file)
@@ -173,11 +173,11 @@ static void BACKUP_walk(siridb_t * siridb, void * args __attribute__((unused)))
         siridb_fifo_close(siridb->fifo);
     }
 
-    if (siridb->buffer_fp != NULL)
+    if (siridb->buffer->fp != NULL)
     {
-        if (fclose(siridb->buffer_fp) == 0)
+        if (fclose(siridb->buffer->fp) == 0)
         {
-            siridb->buffer_fp = NULL;
+            siridb->buffer->fp = NULL;
         }
         else
         {
index e6723dbe2584f5d12470619cf1a48bdf4492d123..07d5fa0dde801fb67f8e0415d579144dbaa493e9 100644 (file)
@@ -60,7 +60,7 @@ static void BUFFERSYNC_cb(uv_timer_t * handle __attribute__((unused)))
         siridb = (siridb_t *) siridb_node->data;
 
         /* flush the buffer, maybe on each insert or another interval? */
-        if (siridb_buffer_fsync(siridb))
+        if (siridb_buffer_fsync(siridb->buffer))
         {
             log_critical("fsync() has failed on the buffer file");
         }
index 88d923a6e43d5f8e33a7554310b5e5743b7246c1..3e5d6b1f21ce538f84a3fc6fee9a5f788215a767 100644 (file)
 /* when set to 1, no caching is done. 1 is the minimum value. */
 #define SIRIDB_BUFFER_CACHE 64
 
-static int buffer__create_new(siridb_t * siridb, siridb_series_t * series);
-static int buffer__use_empty(siridb_t * siridb, siridb_series_t * series);
+static int buffer__create_new(
+        siridb_buffer_t * buffer,
+        siridb_series_t * series);
+static int buffer__use_empty(
+        siridb_buffer_t * buffer,
+        siridb_series_t * series);
 static void buffer__migrate_to_new(char * pt, size_t sz);
 
 /* buffer__start cannot conflict with a series_id since id 0 is never used */
@@ -35,25 +39,61 @@ static const uint32_t buffer__start = 0x00000000;
 static const uint64_t buffer__end = 0xffffffffffffffff;
 
 
+siridb_buffer_t * siridb_buffer_new(void)
+{
+    siridb_buffer_t * buffer = malloc(sizeof(siridb_buffer_t));
+    if (buffer == NULL)
+    {
+        return NULL;
+    }
+    buffer->empty = slist_new(SLIST_DEFAULT_SIZE);
+    if (buffer->empty == NULL)
+    {
+        free(buffer);
+        return NULL;
+    }
+    buffer->fd = 0;
+    buffer->fp = NULL;
+    buffer->len = 0;
+    buffer->nsize = 0;
+    buffer->path = NULL;
+    buffer->size = 0;
+    buffer->template = NULL;
+
+    return buffer;
+}
+
+void siridb_buffer_free(siridb_buffer_t * buffer)
+{
+    if (buffer->fp != NULL)
+    {
+        fclose(buffer->fp);
+    }
+    free(buffer->template);
+    free(buffer->path);
+    slist_free(buffer->empty);
+    free(buffer);
+}
+
 /*
  * Returns 0 if success or EOF in case of an error.
  */
 int siridb_buffer_write_empty(
-        siridb_t * siridb,
+        siridb_buffer_t * buffer,
         siridb_series_t * series)
 {
-    memcpy(siridb->buffer_clear + 4, &series->id, sizeof(uint32_t));
+    memcpy(buffer->template + 4, &series->id, sizeof(uint32_t));
     return (
         /* go to the series position in buffer */
-        fseeko( siridb->buffer_fp,
+        fseeko( buffer->fp,
                 series->bf_offset,
                 SEEK_SET) ||
 
         /* write end ts */
-        fwrite( siridb->buffer_clear,
-                siridb->buffer_size,
+        fwrite( buffer->template,
+                buffer->size,
                 1,
-                siridb->buffer_fp) != 1) ? EOF : 0;
+                buffer->fp) != 1) ? EOF : 0;
 }
 
 /*
@@ -63,7 +103,7 @@ int siridb_buffer_write_empty(
  * Returns 0 if success or EOF in case of an error.
  */
 int siridb_buffer_write_last_point(
-        siridb_t * siridb,
+        siridb_buffer_t * buffer,
         siridb_series_t * series)
 {
     siridb_point_t * point;
@@ -79,67 +119,62 @@ int siridb_buffer_write_last_point(
 
     return (
         /* jump to position where to write the new point */
-        fseeko( siridb->buffer_fp,
+        fseeko( buffer->fp,
                 series->bf_offset + 8 + (16 * last_idx),
                 SEEK_SET) ||
 
         /* write time-stamp and value */
-        fwrite(buf, sz, 1, siridb->buffer_fp) != 1) ? EOF : 0;
+        fwrite(buf, sz, 1, buffer->fp) != 1) ? EOF : 0;
 }
 
 /*
  * Returns 0 if successful; -1 and a SIGNAL is raised in case an error occurred.
  */
-int siridb_buffer_new_series(siridb_t * siridb, siridb_series_t * series)
+int siridb_buffer_new_series(
+        siridb_buffer_t * buffer,
+        siridb_series_t * series)
 {
     /* allocate new buffer */
-    series->buffer = siridb_points_new(siridb->buffer_len, series->tp);
+    series->buffer = siridb_points_new(buffer->len, series->tp);
     if (series->buffer == NULL)
     {
         return -1;  /* signal is raised */
     }
 
-    return (siridb->empty_buffers->len) ?
-            buffer__use_empty(siridb, series) :
-            buffer__create_new(siridb, series);
-}
-
-int siridb_buffer_fsync(siridb_t * siridb)
-{
-    if (siridb->buffer_fp == NULL)
-    {
-        return 0;
-    }
-    int buffer_fd = fileno(siridb->buffer_fp);
-    return (buffer_fd != -1) ? fsync(buffer_fd) : -1;
+    return (buffer->empty->len) ?
+            buffer__use_empty(buffer, series) :
+            buffer__create_new(buffer, series);
 }
 
 /*
  * Returns 0 if successful or -1 in case of an error.
  */
-int siridb_buffer_open(siridb_t * siridb)
+int siridb_buffer_open(siridb_buffer_t * buffer)
 {
-    int buffer_fd, rc;
-    siridb_misc_get_fn(fn, siridb->buffer_path, SIRIDB_BUFFER_FN)
+    const int flags = POSIX_FADV_RANDOM | POSIX_FADV_DONTNEED;
+    int rc;
+    siridb_misc_get_fn(fn, buffer->path, SIRIDB_BUFFER_FN)
 
-    if ((siridb->buffer_fp = fopen(fn, "r+")) == NULL)
+    if ((buffer->fp = fopen(fn, "r+")) == NULL)
     {
         log_critical("Cannot open '%s' for reading and writing", fn);
         return -1;
     }
 
-    buffer_fd = fileno(siridb->buffer_fp);
+    buffer->fd = fileno(buffer->fp);
 
-    if (buffer_fd == -1)
+    if (buffer->fd == -1)
     {
         log_critical("Cannot get file descriptor: '%s'", fn);
+        fclose(buffer->fp);
+        buffer->fp = NULL;
         return -1;
     }
 
 #ifdef __APPLE__
     rc = 0;  /* no posix_fadvise on apple */
 #else
-    rc = posix_fadvise(buffer_fd, 0, 0, POSIX_FADV_RANDOM|POSIX_FADV_DONTNEED);
+    rc = posix_fadvise(buffer->fd, 0, 0, flags);
     if (rc)
     {
         log_warning("Cannot set advice for file access: '%s' (%d)", fn, rc);
@@ -177,11 +212,12 @@ static void buffer__migrate_to_new(char * pt, size_t sz)
  */
 int siridb_buffer_load(siridb_t * siridb)
 {
+    siridb_buffer_t * buffer = siridb->buffer;
     FILE * fp;
     FILE * fp_temp;
     size_t read_at_once = 8;
     size_t num, i;
-    char buffer[siridb->buffer_size * read_at_once];
+    char buf[buffer->size * read_at_once];
     char * pt, * end;
     long int offset = 0;
     siridb_series_t * series;
@@ -191,25 +227,25 @@ int siridb_buffer_load(siridb_t * siridb)
 
     log_info("Loading and cleanup buffer");
 
-    siridb->buffer_clear = malloc(siridb->buffer_size);
-    if (siridb->buffer_clear == NULL)
+    buffer->template = malloc(buffer->size);
+    if (buffer->template == NULL)
     {
         log_critical("Allocation error while loading buffer");
         return -1;
     }
 
-    for (   pt = siridb->buffer_clear,
-            end = siridb->buffer_clear + siridb->buffer_size;
+    for (   pt = buffer->template,
+            end = buffer->template + buffer->size;
             pt < end;
             pt += sizeof(uint64_t))
     {
         memcpy(pt, &buffer__end, sizeof(uint64_t));
     }
 
-    memcpy(siridb->buffer_clear, &buffer__start, sizeof(uint32_t));
+    memcpy(buffer->template, &buffer__start, sizeof(uint32_t));
 
-    siridb_misc_get_fn(fn, siridb->buffer_path, SIRIDB_BUFFER_FN)
-    siridb_misc_get_fn(fn_temp, siridb->buffer_path, "__" SIRIDB_BUFFER_FN)
+    siridb_misc_get_fn(fn, buffer->path, SIRIDB_BUFFER_FN)
+    siridb_misc_get_fn(fn_temp, buffer->path, "__" SIRIDB_BUFFER_FN)
 
     if (xpath_file_exist(fn_temp))
     {
@@ -243,11 +279,11 @@ int siridb_buffer_load(siridb_t * siridb)
         return -1;
     }
 
-    while ((num = fread(buffer, siridb->buffer_size, read_at_once, fp)))
+    while ((num = fread(buf, buffer->size, read_at_once, fp)))
     {
         for (i = 0; i < num; i++)
         {
-            pt = buffer + i * siridb->buffer_size;
+            pt = buf + i * buffer->size;
 
             buf_start = *((uint32_t *) pt);
             if (buf_start != buffer__start)
@@ -257,7 +293,7 @@ int siridb_buffer_load(siridb_t * siridb)
                     log_warning("Buffer will be migrated");
                     log_migrate = 0;
                 }
-                buffer__migrate_to_new(pt, siridb->buffer_size);
+                buffer__migrate_to_new(pt, buffer->size);
             }
 
             pt += sizeof(uint32_t);
@@ -271,7 +307,7 @@ int siridb_buffer_load(siridb_t * siridb)
                 continue;
             }
 
-            series->buffer = siridb_points_new(siridb->buffer_len, series->tp);
+            series->buffer = siridb_points_new(buffer->len, series->tp);
             if (series->buffer == NULL)
             {
                 log_critical("Cannot allocate a buffer for series id %u",
@@ -290,14 +326,13 @@ int siridb_buffer_load(siridb_t * siridb)
                 siridb_points_add_point(series->buffer, ts, val);
             }
 
-            offset += siridb->buffer_size;
+            offset += buffer->size;
 
             /* increment series->length which is 0 at this time */
             series->length += series->buffer->len;
 
             /* write to output file and check if write was successful */
-            if ((fwrite(buffer + i * siridb->buffer_size,
-                    siridb->buffer_size, 1, fp_temp) != 1))
+            if ((fwrite(buf + i*buffer->size, buffer->size, 1, fp_temp) != 1))
             {
                 log_critical("Could not write to temporary buffer file: '%s'",
                         fn_temp);
@@ -320,17 +355,6 @@ int siridb_buffer_load(siridb_t * siridb)
     return 0;
 }
 
-void siridb_buffer_free(siridb_t * siridb)
-{
-    if (siridb->buffer_fp != NULL)
-    {
-        fclose(siridb->buffer_fp);
-        siridb->buffer_fp = NULL;
-    }
-    free(siridb->buffer_clear);
-    siridb->buffer_clear = NULL;
-}
-
 /*
  * Reserve a space in the buffer for a new series. The position of this space
  * in the buffer is read from siridb->empty_buffers so this list must have
@@ -341,11 +365,13 @@ void siridb_buffer_free(siridb_t * siridb)
  * Note that an available spot must be checked before calling this function.
  * This functions has undefined behavior if no spot is found.
  */
-static int buffer__use_empty(siridb_t * siridb, siridb_series_t * series)
+static int buffer__use_empty(
+        siridb_buffer_t * buffer,
+        siridb_series_t * series)
 {
-    series->bf_offset = (long int) slist_pop(siridb->empty_buffers);
+    series->bf_offset = (long int) slist_pop(buffer->empty);
 
-    if (siridb_buffer_write_empty(siridb, series))
+    if (siridb_buffer_write_empty(buffer, series))
     {
         ERR_FILE
         return -1;
@@ -361,58 +387,53 @@ static int buffer__use_empty(siridb_t * siridb, siridb_series_t * series)
  *
  * Returns 0 if successful or -1 and a signal is raised in case of an error.
  */
-static int buffer__create_new(siridb_t * siridb, siridb_series_t * series)
+static int buffer__create_new(
+        siridb_buffer_t * buffer,
+        siridb_series_t * series)
 {
     long int buffer_pos;
-    /* get file descriptor */
-    int buffer_fd = fileno(siridb->buffer_fp);
 
-    if (buffer_fd == -1)
-    {
-        ERR_FILE
-        return -1;
-    }
 
     /* jump to end of buffer */
-    if (fseeko(siridb->buffer_fp, 0, SEEK_END))
+    if (fseeko(buffer->fp, 0, SEEK_END))
     {
         ERR_FILE
         return -1;
     }
 
     /* bind the current offset to the new series */
-    if ((series->bf_offset = ftello(siridb->buffer_fp)) == -1)
+    if ((series->bf_offset = ftello(buffer->fp)) == -1)
     {
         ERR_FILE
         return -1;
     }
 
     /* write buffer start and series ID to buffer */
-    if (siridb_buffer_write_empty(siridb, series))
+    if (siridb_buffer_write_empty(buffer, series))
     {
         ERR_FILE
         return -1;
     }
 
-    buffer_pos = series->bf_offset + siridb->buffer_size * SIRIDB_BUFFER_CACHE;
+    buffer_pos = series->bf_offset + buffer->size * SIRIDB_BUFFER_CACHE;
 
     /* fill buffer with zeros if possible */
-    if (ftruncate(buffer_fd, buffer_pos))
+    if (ftruncate(buffer->fd, buffer_pos))
     {
         ERR_FILE
         return -1;
     }
 
     /* commit changes to disk */
-    if (fsync(buffer_fd))
+    if (fsync(buffer->fd))
     {
         ERR_FILE
         return -1;
     }
 
-    while ((buffer_pos -= siridb->buffer_size) > series->bf_offset)
+    while ((buffer_pos -= buffer->size) > series->bf_offset)
     {
-        slist_append_safe(&siridb->empty_buffers, (void *) buffer_pos);
+        slist_append_safe(&buffer->empty, (void *) buffer_pos);
     }
 
     return 0;
index 7946d0773b94fcb8e12c1930493d82acfb483249..3e94fe8d150d5b78440cd54221008b3c9cc885a8 100644 (file)
  *
  */
 
-static siridb_t * SIRIDB_new(void);
-
-static int SIRIDB_from_unpacker(
+static siridb_t * siridb__new(void);
+static int siridb__from_unpacker(
         qp_unpacker_t * unpacker,
         siridb_t ** siridb,
         const char * dbpath,
         char * err_msg);
+static siridb_t * siridb__from_dat(const char * dbpath);
+static int siridb__read_conf(siridb_t * siridb);
+static int siridb__lock(const char * dbpath, int lock_flags);
 
 #define READ_DB_EXIT_WITH_ERROR(ERROR_MSG)  \
     strcpy(err_msg, ERROR_MSG);             \
@@ -85,7 +87,6 @@ int8_t siridb_get_idle_percentage(siridb_t * siridb)
     return (idle > 100) ? 100 : idle;
 }
 
-
 /*
  * Check if at least database.conf and database.dat exist in the path.
  */
@@ -120,14 +121,7 @@ int siridb_is_db_path(const char * dbpath)
 siridb_t * siridb_new(const char * dbpath, int lock_flags)
 {
     size_t len = strlen(dbpath);
-    lock_t lock_rc;
-    char buffer[XPATH_MAX];
-    cfgparser_t * cfgparser;
-    cfgparser_option_t * option = NULL;
-    qp_unpacker_t * unpacker;
     siridb_t * siridb;
-    char err_msg[512];
-    int rc;
     size_t i;
 
     if (!len || dbpath[len - 1] != '/')
@@ -143,118 +137,25 @@ siridb_t * siridb_new(const char * dbpath, int lock_flags)
         return NULL;
     }
 
-    lock_rc = lock_lock(dbpath, lock_flags);
-
-    switch (lock_rc)
-    {
-    case LOCK_IS_LOCKED_ERR:
-    case LOCK_PROCESS_NAME_ERR:
-    case LOCK_WRITE_ERR:
-    case LOCK_READ_ERR:
-    case LOCK_MEM_ALLOC_ERR:
-        log_error("%s (%s)", lock_str(lock_rc), dbpath);
-        return NULL;
-    case LOCK_NEW:
-        log_info("%s (%s)", lock_str(lock_rc), dbpath);
-        break;
-    case LOCK_OVERWRITE:
-        log_warning("%s (%s)", lock_str(lock_rc), dbpath);
-        break;
-    default:
-        assert (0);
-        break;
-    }
-
-    /* read database.conf */
-    snprintf(buffer,
-            XPATH_MAX,
-            "%sdatabase.conf",
-            dbpath);
-
-    cfgparser = cfgparser_new();
-    if (cfgparser == NULL)
-    {
-        return NULL;  /* signal is raised */
-    }
-    if ((rc = cfgparser_read(cfgparser, buffer)) != CFGPARSER_SUCCESS)
-    {
-        log_error("Could not read '%s': %s",
-                buffer,
-                cfgparser_errmsg(rc));
-        cfgparser_free(cfgparser);
-        return NULL;
-    }
-
-    snprintf(buffer,
-            XPATH_MAX,
-            "%sdatabase.dat",
-            dbpath);
-
-    if ((unpacker = qp_unpacker_ff(buffer)) == NULL)
-    {
-        /* qp_unpacker has done some logging */
-        cfgparser_free(cfgparser);
-        return NULL;
-    }
-
-    if ((rc = SIRIDB_from_unpacker(
-            unpacker,
-            &siridb,
-            dbpath,
-            err_msg)) < 0)
+    if (siridb__lock(dbpath, lock_flags))
     {
-        log_error("Could not read '%s': %s", buffer, err_msg);
-        qp_unpacker_ff_free(unpacker);
-        cfgparser_free(cfgparser);
+        log_error("Cannot lock database path '%s'", dbpath);
         return NULL;
     }
 
-    qp_unpacker_ff_free(unpacker);
-
-    if (rc > 0 && siridb_save(siridb))
+    siridb = siridb__from_dat(dbpath);
+    if (siridb == NULL)
     {
-        log_error("Could not write file: %s", buffer);
-        cfgparser_free(cfgparser);
-        siridb_decref(siridb);
+        log_error("Cannot load SiriDB from database path '%s'", dbpath);
         return NULL;
     }
 
     log_info("Start loading database: '%s'", siridb->dbname);
 
-    /* read buffer_path from database.conf */
-    rc = cfgparser_get_option(
-                &option,
-                cfgparser,
-                "buffer",
-                "path");
-
-    if (rc == CFGPARSER_SUCCESS && option->tp == CFGPARSER_TP_STRING)
-    {
-        len = strlen(option->val->string);
-        siridb->buffer_path = NULL;
-        if (option->val->string[len - 1] == '/')
-        {
-            siridb->buffer_path = strdup(option->val->string);
-        }
-        else if (asprintf(
-                &siridb->buffer_path,
-                "%s/",
-                option->val->string) < 0)
-        {
-            siridb->buffer_path = NULL;
-        }
-    }
-    else
-    {
-        siridb->buffer_path = siridb->dbpath;
-    }
-
-    /* free cfgparser */
-    cfgparser_free(cfgparser);
-
-    if (siridb->buffer_path == NULL)
+    /* read database.conf */
+    if (siridb__read_conf(siridb))
     {
-        ERR_ALLOC
+        log_error("Could not read config for database '%s'", siridb->dbname);
         siridb_decref(siridb);
         return NULL;
     }
@@ -283,26 +184,26 @@ siridb_t * siridb_new(const char * dbpath, int lock_flags)
         return NULL;
     }
 
-    /* load buffer */
-    if (siridb_buffer_load(siridb))
+    /* load shards */
+    if (siridb_shards_load(siridb))
     {
-        log_error("Could not read buffer for database '%s'", siridb->dbname);
+        log_error("Could not read shards for database '%s'", siridb->dbname);
         siridb_decref(siridb);
         return NULL;
     }
 
-    /* open buffer */
-    if (siridb_buffer_open(siridb))
+    /* load buffer */
+    if (siridb_buffer_load(siridb))
     {
-        log_error("Could not open buffer for database '%s'", siridb->dbname);
+        log_error("Could not read buffer for database '%s'", siridb->dbname);
         siridb_decref(siridb);
         return NULL;
     }
 
-    /* load shards */
-    if (siridb_shards_load(siridb))
+    /* open buffer */
+    if (siridb_buffer_open(siridb->buffer))
     {
-        log_error("Could not read shards for database '%s'", siridb->dbname);
+        log_error("Could not open buffer for database '%s'", siridb->dbname);
         siridb_decref(siridb);
         return NULL;
     }
@@ -381,7 +282,7 @@ siridb_t * siridb_new(const char * dbpath, int lock_flags)
  *
  * (a SIGNAL can be set in case of an error)
  */
-static int SIRIDB_from_unpacker(
+static int siridb__from_unpacker(
         qp_unpacker_t * unpacker,
         siridb_t ** siridb,
         const char * dbpath,
@@ -417,7 +318,7 @@ static int SIRIDB_from_unpacker(
     }
 
     /* create a new SiriDB structure */
-    *siridb = SIRIDB_new();
+    *siridb = siridb__new();
     if (*siridb == NULL)
     {
         sprintf(err_msg, "Cannot create SiriDB instance.");
@@ -478,8 +379,8 @@ static int SIRIDB_from_unpacker(
     }
 
     /* bind buffer size and len to SiriDB */
-    (*siridb)->buffer_size = (size_t) qp_obj.via.int64;
-    (*siridb)->buffer_len = (*siridb)->buffer_size / sizeof(siridb_point_t);
+    (*siridb)->buffer->size = (size_t) qp_obj.via.int64;
+    (*siridb)->buffer->len = (*siridb)->buffer->size / sizeof(siridb_point_t);
 
     /* read number duration  */
     if (qp_next(unpacker, &qp_obj) != QP_INT64)
@@ -618,18 +519,10 @@ ssize_t siridb_get_file(char ** buffer, siridb_t * siridb)
  */
 int siridb_open_files(siridb_t * siridb)
 {
-    int open_files = procinfo_open_files(siridb->dbpath);
-
-    if (    siridb->buffer_path != siridb->dbpath &&
-            strncmp(
-                siridb->dbpath,
-                siridb->buffer_path,
-                strlen(siridb->dbpath)))
-    {
-        open_files += procinfo_open_files(siridb->buffer_path);
-    }
-
-    return open_files;
+    siridb_buffer_t * buffer = siridb->buffer;
+    return procinfo_open_files(
+            siridb->dbpath,
+            (buffer->fp == NULL) ? -1 : buffer->fd);
 }
 
 /*
@@ -655,7 +548,7 @@ int siridb_save(siridb_t * siridb)
             qp_fadd_raw(fpacker, (const unsigned char *) siridb->uuid, 16) ||
             qp_fadd_string(fpacker, siridb->dbname) ||
             qp_fadd_int8(fpacker, siridb->time->precision) ||
-            qp_fadd_int64(fpacker, siridb->buffer_size) ||
+            qp_fadd_int64(fpacker, siridb->buffer->size) ||
             qp_fadd_int64(fpacker, siridb->duration_num) ||
             qp_fadd_int64(fpacker, siridb->duration_log) ||
             qp_fadd_string(fpacker, iso8601_tzname(siridb->tz)) ||
@@ -678,7 +571,10 @@ void siridb__free(siridb_t * siridb)
 #endif
 
     /* first we should close the buffer and all other open files */
-    siridb_buffer_free(siridb);
+    if (siridb->buffer != NULL)
+    {
+        siridb_buffer_free(siridb->buffer);
+    }
 
     if (siridb->dropped_fp != NULL)
     {
@@ -696,9 +592,6 @@ void siridb__free(siridb_t * siridb)
         siridb_users_free(siridb->users);
     }
 
-    /* free buffer positions */
-    slist_free(siridb->empty_buffers);
-
     /* we do not need to free server and replica since they exist in
      * this list and therefore will be freed.
      */
@@ -753,12 +646,6 @@ void siridb__free(siridb_t * siridb)
         imap_free(siridb->shards, (imap_free_cb) &siridb__shard_decref);
     }
 
-    /* only free buffer path when not equal to db_path */
-    if (siridb->buffer_path != siridb->dbpath)
-    {
-        free(siridb->buffer_path);
-    }
-
     if (siridb->groups != NULL)
     {
         siridb_groups_decref(siridb->groups);
@@ -787,7 +674,7 @@ void siridb__free(siridb_t * siridb)
 /*
  * Returns NULL and raises a SIGNAL in case an error has occurred.
  */
-static siridb_t * SIRIDB_new(void)
+static siridb_t * siridb__new(void)
 {
     siridb_t * siridb = (siridb_t *) malloc(sizeof(siridb_t));
     if (siridb == NULL)
@@ -827,9 +714,9 @@ static siridb_t * SIRIDB_new(void)
                 }
                 else
                 {
-                    /* allocate a list for buffer positions */
-                    siridb->empty_buffers = slist_new(SLIST_DEFAULT_SIZE);
-                    if (siridb->empty_buffers == NULL)
+                    /* allocate a buffer */
+                    siridb->buffer = siridb_buffer_new();
+                    if (siridb->buffer == NULL)
                     {
                         imap_free(siridb->shards, NULL);
                         imap_free(siridb->series_map, NULL);
@@ -845,7 +732,6 @@ static siridb_t * SIRIDB_new(void)
                         siridb->ref = 1;
                         siridb->insert_tasks = 0;
                         siridb->flags = 0;
-                        siridb->buffer_path = NULL;
                         siridb->time = NULL;
                         siridb->users = NULL;
                         siridb->servers = NULL;
@@ -856,7 +742,6 @@ static siridb_t * SIRIDB_new(void)
                         siridb->drop_threshold = DEF_DROP_THRESHOLD;
                         siridb->select_points_limit = DEF_SELECT_POINTS_LIMIT;
                         siridb->list_limit = DEF_LIST_LIMIT;
-                        siridb->buffer_size = -1;
                         siridb->tz = -1;
                         siridb->server = NULL;
                         siridb->replica = NULL;
@@ -866,7 +751,6 @@ static siridb_t * SIRIDB_new(void)
                         siridb->groups = NULL;
 
                         /* make file pointers are NULL when file is closed */
-                        siridb->buffer_fp = NULL;
                         siridb->dropped_fp = NULL;
                         siridb->store = NULL;
 
@@ -880,6 +764,137 @@ static siridb_t * SIRIDB_new(void)
     return siridb;
 }
 
+static siridb_t * siridb__from_dat(const char * dbpath)
+{
+    int rc;
+    siridb_t * siridb = NULL;
+    char err_msg[512];
+    qp_unpacker_t * unpacker;
+    char buffer[XPATH_MAX];
+
+    snprintf(buffer,
+                XPATH_MAX,
+                "%sdatabase.dat",
+                dbpath);
+
+    unpacker = qp_unpacker_ff(buffer);
+    if (unpacker == NULL)
+    {
+        return NULL;
+    }
+
+    if ((rc = siridb__from_unpacker(
+            unpacker,
+            &siridb,
+            dbpath,
+            err_msg)) < 0)
+    {
+        log_error("Could not read '%s': %s", buffer, err_msg);
+        qp_unpacker_ff_free(unpacker);
+        return NULL;
+    }
+
+    qp_unpacker_ff_free(unpacker);
+
+    if (rc > 0 && siridb_save(siridb))
+    {
+        log_error("Could not write file: %s", buffer);
+        siridb_decref(siridb);
+        return NULL;
+    }
+
+    return siridb;
+}
+
+static int siridb__read_conf(siridb_t * siridb)
+{
+    int rc;
+    char buf[XPATH_MAX];
+    cfgparser_t * cfgparser;
+    cfgparser_option_t * option = NULL;
+    siridb_buffer_t * buffer = siridb->buffer;
+    snprintf(buf,
+            XPATH_MAX,
+            "%sdatabase.conf",
+            siridb->dbpath);
+
+    cfgparser = cfgparser_new();
+    if (cfgparser == NULL)
+    {
+        return -1;  /* signal is raised */
+    }
+
+    rc = cfgparser_read(cfgparser, buf);
+
+    if (rc != CFGPARSER_SUCCESS)
+    {
+        log_error("Could not read '%s': %s", buf, cfgparser_errmsg(rc));
+        cfgparser_free(cfgparser);
+        return -1;
+    }
+
+    /* read buffer_path from database.conf */
+    rc = cfgparser_get_option(&option, cfgparser, "buffer", "path");
+    if (rc == CFGPARSER_SUCCESS && option->tp == CFGPARSER_TP_STRING)
+    {
+        size_t len = strlen(option->val->string);
+        buffer->path = NULL;
+        if (option->val->string[len - 1] == '/')
+        {
+            buffer->path = strdup(option->val->string);
+        }
+        else if (
+                len >= 11 &&
+                strcmp(option->val->string + (len-11), "/buffer.dat") == 0)
+        {
+            buffer->path = strndup(option->val->string, len-10);
+        }
+        else if (asprintf(&buffer->path, "%s/", option->val->string) < 0)
+        {
+            buffer->path = NULL;
+        }
+    }
+    else
+    {
+        buffer->path = strdup(siridb->dbpath);
+    }
+
+    rc = cfgparser_get_option(&option, cfgparser, "buffer", "size");
+    if (rc == CFGPARSER_SUCCESS && option->tp == CFGPARSER_TP_INTEGER)
+    {
+
+    }
+
+    cfgparser_free(cfgparser);
+
+    return (buffer->path == NULL) ? -1 : 0;
+}
+
+static int siridb__lock(const char * dbpath, int lock_flags)
+{
+    lock_t lock_rc = lock_lock(dbpath, lock_flags);
+
+    switch (lock_rc)
+    {
+    case LOCK_IS_LOCKED_ERR:
+    case LOCK_PROCESS_NAME_ERR:
+    case LOCK_WRITE_ERR:
+    case LOCK_READ_ERR:
+    case LOCK_MEM_ALLOC_ERR:
+        log_error("%s (%s)", lock_str(lock_rc), dbpath);
+        return -1;
+    case LOCK_NEW:
+        log_info("%s (%s)", lock_str(lock_rc), dbpath);
+        break;
+    case LOCK_OVERWRITE:
+        log_warning("%s (%s)", lock_str(lock_rc), dbpath);
+        break;
+    default:
+        assert (0);
+        break;
+    }
+    return 0;
+}
 
 
 
index b8ef59e79327d75ebbb39fce6f71b6a02c81a4b9..6ca4b23f4f840c79bd6540600e997f2739d65d9d 100644 (file)
@@ -904,7 +904,7 @@ static void INSERT_local_task(uv_async_t * handle)
 
     siridb = ilocal->siridb;
 
-    if (siridb->buffer_fp == NULL && siridb_buffer_open(siridb))
+    if (siridb->buffer->fp == NULL && siridb_buffer_open(siridb->buffer))
     {
         ERR_FILE
         ilocal->status = INSERT_LOCAL_ERROR;
@@ -956,7 +956,7 @@ static void INSERT_local_task(uv_async_t * handle)
 
     if (siri.buffersync == NULL)
     {
-        if (siridb_buffer_fsync(siridb))
+        if (siridb_buffer_fsync(siridb->buffer))
         {
             log_critical("fsync() has failed on the buffer file");
         }
index 33977059ba957dd1e742eb0d2e1ca68cd5bcd38c..dcfbe980e37d1d9e18d06895a0b3e7dcb0a423a4 100644 (file)
@@ -277,7 +277,7 @@ static void prop_buffer_path(
         int map)
 {
     SIRIDB_PROP_MAP("buffer_path", 11)
-    qp_add_string(packer, siridb->buffer_path);
+    qp_add_string(packer, siridb->buffer->path);
 }
 
 static void prop_buffer_size(
@@ -286,7 +286,7 @@ static void prop_buffer_size(
         int map)
 {
     SIRIDB_PROP_MAP("buffer_size", 11)
-    qp_add_int32(packer, (int32_t) siridb->buffer_size);
+    qp_add_int32(packer, (int32_t) siridb->buffer->size);
 }
 
 static void prop_dbname(
index 2d8b36277b5d2b60c4ed5ae2a49b40c11ab37168..2e3d9db347f0e72fa37c5e6109470282668de8ce 100644 (file)
@@ -133,7 +133,7 @@ int siridb_series_add_point(
      */
     siridb_points_add_point(series->buffer, ts, val);
 
-    if (series->buffer->len == siridb->buffer_len)
+    if (series->buffer->len == siridb->buffer->len)
     {
         if (siridb_shards_add_points(
                 siridb,
@@ -145,7 +145,7 @@ int siridb_series_add_point(
         else
         {
             series->buffer->len = 0;
-            if (siridb_buffer_write_empty(siridb, series))
+            if (siridb_buffer_write_empty(siridb->buffer, series))
             {
                 ERR_FILE
                 rc = -1;
@@ -154,7 +154,7 @@ int siridb_series_add_point(
     }
     else
     {
-        if (siridb_buffer_write_last_point(siridb, series))
+        if (siridb_buffer_write_last_point(siridb->buffer, series))
         {
             ERR_FILE
             log_critical("Cannot write new point to buffer");
@@ -181,7 +181,7 @@ int siridb_series_add_pcache(
         siridb_series_t *__restrict series,
         siridb_pcache_t *__restrict pcache)
 {
-    if (pcache->len > siridb->buffer_len || series->buffer == NULL)
+    if (pcache->len > siridb->buffer->len || series->buffer == NULL)
     {
         series->length += pcache->len;
 
@@ -191,7 +191,7 @@ int siridb_series_add_pcache(
                 (siridb_points_t *) pcache);
     }
 
-    if (pcache->len + series->buffer->len > siridb->buffer_len)
+    if (pcache->len + series->buffer->len > siridb->buffer->len)
     {
         series->length += pcache->len;
 
@@ -217,7 +217,7 @@ int siridb_series_add_pcache(
         }
 
         series->buffer->len = 0;
-        if (siridb_buffer_write_empty(siridb, series))
+        if (siridb_buffer_write_empty(siridb->buffer, series))
         {
             ERR_FILE
             return -1;
@@ -287,7 +287,7 @@ siridb_series_t * siridb_series_new(
     }
 
     /* create a buffer for series (except string series) */
-    if (tp != TP_STRING && siridb_buffer_new_series(siridb, series))
+    if (tp != TP_STRING && siridb_buffer_new_series(siridb->buffer, series))
     {
         /* signal is raised */
         log_critical("Could not create buffer for series '%s'.",
@@ -354,7 +354,7 @@ void siridb__series_free(siridb_series_t *__restrict series)
         if (series->flags & SIRIDB_SERIES_IS_DROPPED)
         {
             slist_append_safe(
-                &series->siridb->empty_buffers,
+                &series->siridb->buffer->empty,
                 (void *) series->bf_offset);
         }
     }
index 6b994e4d4e351342cc8d683465d5de09bf2129d3..778c150bdda4855205a52c89322733dce7d3dc79 100644 (file)
@@ -711,8 +711,8 @@ static void SERVER_on_connect(uv_connect_t * req, int status)
                 qp_add_int8(packer, siri.cfg->ip_support) ||
                 qp_add_string_term(packer, uv_version_string()) ||
                 qp_add_string_term(packer, siridb->dbpath) ||
-                qp_add_string_term(packer, siridb->buffer_path) ||
-                qp_add_int64(packer, (int64_t) siridb->buffer_size) ||
+                qp_add_string_term(packer, siridb->buffer->path) ||
+                qp_add_int64(packer, (int64_t) siridb->buffer->size) ||
                 qp_add_int32(packer, (int32_t) siri.startup_time) ||
                 qp_add_string_term(packer, siridb->server->address) ||
                 qp_add_int32(packer, (int32_t) siridb->server->port))
@@ -1013,7 +1013,7 @@ int siridb_server_cexpr_cb(
         return cexpr_str_cmp(
                 cond->operator,
                 (wserver->siridb->server == wserver->server) ?
-                        wserver->siridb->buffer_path :
+                        wserver->siridb->buffer->path :
                         (wserver->server->buffer_path != NULL) ?
                                 wserver->server->buffer_path : "",
                 cond->str);
@@ -1022,7 +1022,7 @@ int siridb_server_cexpr_cb(
         return cexpr_int_cmp(
                 cond->operator,
                 (wserver->siridb->server == wserver->server) ?
-                        wserver->siridb->buffer_size :
+                        wserver->siridb->buffer->size :
                         wserver->server->buffer_size,
                 cond->int64);
 
index 2342e57141623dd2a00019c2c793b52ad88486e7..9c6aea6010e32b8d62106b05f9ac9ae7855039c5 100644 (file)
@@ -574,7 +574,7 @@ int siridb_servers_list(siridb_server_t * server, uv_async_t * handle)
             qp_add_string(
                     query->packer,
                     (siridb->server == server) ?
-                            siridb->buffer_path :
+                            siridb->buffer->path :
                             (server->buffer_path != NULL) ?
                                     server->buffer_path : "");
             break;
@@ -582,7 +582,7 @@ int siridb_servers_list(siridb_server_t * server, uv_async_t * handle)
             qp_add_int64(
                     query->packer,
                     (siridb->server == server) ?
-                            siridb->buffer_size : server->buffer_size);
+                            siridb->buffer->size : server->buffer_size);
             break;
         case CLERI_GID_K_DBPATH:
             qp_add_string(